package edu.nepu.flink.api.sideoutput;

import edu.nepu.flink.api.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @Date 2024/3/2 10:42
 * @Created by chenshuaijun
 */
public class SideOutputDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> source = env.socketTextStream("hadoop102", 9999).map(new MapFunction<String, WaterSensor>() {

            @Override
            public WaterSensor map(String value) throws Exception {
                String[] split = value.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000;
            }
        }));
        /**
         * 我们演示一个案例，对每个传感器，水位超过10的输出告警信息
         */
        OutputTag<String> outputTag = new OutputTag<String>("waring") {};
        SingleOutputStreamOperator<String> process = source.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {

            @Override
            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {
                if (value.vc > 10) {
                    ctx.output(outputTag, "id为" + value.id + " 水位值为 " + value.vc + " 超过了10,所以要发出报警了");
                }
                out.collect(value.toString());
            }
        });

        process.print("主流输出");
        process.getSideOutput(outputTag).printToErr("报警信息");

        env.execute();
    }
}
